{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Simple AutoML for time series with Ray Core\n",
    "\n",
    "<a id=\"try-anyscale-quickstart-automl_for_time_series\" href=\"https://console.anyscale.com/register/ha?render_flow=ray&utm_source=ray_docs&utm_medium=docs&utm_campaign=automl_for_time_series\">\n",
    "    <img src=\"../../_static/img/run-on-anyscale.svg\" alt=\"try-anyscale-quickstart\">\n",
    "</a>\n",
    "<br></br>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```{tip}\n",
    "We strongly recommend using [Ray Tune](tune-main) for hyperparameter tuning/AutoML, which will enable you to build it faster and more easily, and get the built-in benefits like logging, fault tolerance and many more. If you think your use case cannot be supported by Ray Tune, we'd love to get your feedback e.g. through a [Ray GitHub issue](https://github.com/ray-project/ray/issues).\n",
    "```\n",
    "\n",
    "AutoML (Automatic Machine Learning) is a broad topic, but in essence, it boils down to choosing the best model (and possibly preprocessing) for the task and dataset at hand. While there exist multiple advanced AutoML frameworks, we can quickly build a simple solution using just Ray Core and stateless tasks.\n",
    "\n",
    "If you are interested in applying more advanced optimization algorithms or would like to take advantage of a greater level of abstraction and multiple built-in features, we highly recommend to use [Ray Tune's Tuner](tune-60-seconds).\n",
    "\n",
    "In this notebook, we will build an AutoML (or more precisely, an AutoTS) system which will choose the best combination of a [statsforecast](https://github.com/Nixtla/statsforecast) model and hyperparameters for a time series regression task - here, we will be using a partition of the [M5 dataset](https://www.kaggle.com/c/m5-forecasting-accuracy)."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Simple AutoML consists of running different functions (hyperparameter configurations) on the same data independently of each other. We will want to train models with different configurations and evaluate them to obtain various metrics, such as mean square error. After all configurations have been evaluated, we will be able to choose the best configuration according to the metric we want to use.\n",
    "\n",
    "![AutoML](./images/automl.svg)\n",
    "\n",
    "To make this example more practical, we will be using [time series cross-validation (CV)](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.TimeSeriesSplit.html) as our evaluation strategy. Cross-validation works by evaluating a model k-times, each time choosing a different subset (fold) of the data for training and evaluation. This allows for more robust estimation of performance and helps prevent overfitting, especially with small data. In other words, we will be running n * k separate evaluations, where n is the number of configurations and k is the number of folds."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Walkthrough\n",
    "\n",
    "Let’s start by importing Ray and initializing a local Ray cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import List, Union, Callable, Dict, Type, Tuple\n",
    "import time\n",
    "import ray\n",
    "import itertools\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "from collections import defaultdict\n",
    "from statsforecast import StatsForecast\n",
    "from statsforecast.models import AutoETS, AutoARIMA, _TS\n",
    "from pyarrow import parquet as pq\n",
    "from sklearn.model_selection import TimeSeriesSplit\n",
    "from sklearn.metrics import mean_squared_error, mean_absolute_error"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(ignore_reinit_error=True)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will break up our logic into several functions and a Ray [task](ray-remote-functions).\n",
    "\n",
    "The Ray task is `train_and_evaluate_fold`, which contains all the logic necessary to fit and evaluate a model on a CV fold of data. We structure our task to take in a dataset and indices splitting it into train and test - that way, we can keep one instance of the dataset in the Ray object store and split it in each task separately. We are defining this as a Ray task as we want all folds to be evaluated in parallel on a Ray cluster - Ray will handle all orchestration and execution. Each task will reserve 1 CPU core by default."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@ray.remote\n",
    "def train_and_evaluate_fold(\n",
    "    model: _TS,\n",
    "    df: pd.DataFrame,\n",
    "    train_indices: np.ndarray,\n",
    "    test_indices: np.ndarray,\n",
    "    label_column: str,\n",
    "    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],\n",
    "    freq: str = \"D\",\n",
    ") -> Dict[str, float]:\n",
    "    try:\n",
    "        # Create the StatsForecast object with train data & model.\n",
    "        statsforecast = StatsForecast(models=[model], freq=freq)\n",
    "        # Make a forecast and calculate metrics on test data.\n",
    "        # This will fit the model first automatically.\n",
    "        forecast = statsforecast.forecast(h=len(test_indices), df=df.iloc[train_indices])\n",
    "        return {\n",
    "            metric_name: metric(\n",
    "                df.iloc[test_indices][label_column], forecast[model.__class__.__name__]\n",
    "            )\n",
    "            for metric_name, metric in metrics.items()\n",
    "        }\n",
    "    except Exception as ex:\n",
    "        print(f\"Exception generating forecast for model {model}: {ex}\")\n",
    "        # In case the model fit or eval fails, return None for all metrics.\n",
    "        return {metric_name: None for metric_name, metric in metrics.items()}"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`evaluate_models_with_cv` is a driver function to run our optimization loop. We take in a list of models (with their parameters already set) and the dataframe.\n",
    "\n",
    "The dataframe is put into the Ray object store and reused, which means we only need to serialize it once. That way, we avoid an {ref}`ray-pass-large-arg-by-value`.\n",
    "\n",
    "We treat the fitting of each fold as a separate task. We generate k-tasks for each model and wait for them to complete by calling `ray.get()`, which blocks until all tasks finish and the results are collected. We then aggregate the returned metrics to calculate mean metrics from each fold for each model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def evaluate_models_with_cv(\n",
    "    models: List[_TS],\n",
    "    df: pd.DataFrame,\n",
    "    label_column: str,\n",
    "    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],\n",
    "    freq: str = \"D\",\n",
    "    cv: Union[int, TimeSeriesSplit] = 5,\n",
    ") -> Dict[_TS, Dict[str, float]]:\n",
    "    # Obtain CV train-test indices for each fold.\n",
    "    if isinstance(cv, int):\n",
    "        cv = TimeSeriesSplit(cv)\n",
    "    train_test_indices = list(cv.split(df))\n",
    "\n",
    "    # Put df into Ray object store for better performance.\n",
    "    df_ref = ray.put(df)\n",
    "\n",
    "    # Add tasks to be executed for each fold.\n",
    "    fold_refs = []\n",
    "    for model in models:\n",
    "        fold_refs.extend(\n",
    "            [\n",
    "                train_and_evaluate_fold.remote(\n",
    "                    model,\n",
    "                    df_ref,\n",
    "                    train_indices,\n",
    "                    test_indices,\n",
    "                    label_column,\n",
    "                    metrics,\n",
    "                    freq=freq,\n",
    "                )\n",
    "                for train_indices, test_indices in train_test_indices\n",
    "            ]\n",
    "        )\n",
    "\n",
    "    # wait on all tasks\n",
    "    fold_results = []\n",
    "    while fold_refs:\n",
    "        ready_refs, fold_refs = ray.wait(fold_refs)\n",
    "        fold_results.extend(ray.get(ready_refs))\n",
    "\n",
    "    # Split fold results into a list of CV splits-sized chunks.\n",
    "    # Ray guarantees that order is preserved.\n",
    "    fold_results_per_model = [\n",
    "        fold_results[i : i + len(train_test_indices)]\n",
    "        for i in range(0, len(fold_results), len(train_test_indices))\n",
    "    ]\n",
    "\n",
    "    # Aggregate and average results from all folds per model.\n",
    "    # We go from a list of dicts to a dict of lists and then\n",
    "    # get a mean of those lists.\n",
    "    mean_results_per_model = []\n",
    "    for model_results in fold_results_per_model:\n",
    "        aggregated_results = defaultdict(list)\n",
    "        for fold_result in model_results:\n",
    "            for metric, value in fold_result.items():\n",
    "                aggregated_results[metric].append(value)\n",
    "        mean_results = {\n",
    "            metric: np.mean(values) for metric, values in aggregated_results.items()\n",
    "        }\n",
    "        mean_results_per_model.append(mean_results)\n",
    "\n",
    "    # Join models and their metrics together.\n",
    "    mean_results_per_model = {\n",
    "        models[i]: mean_results_per_model[i] for i in range(len(mean_results_per_model))\n",
    "    }\n",
    "    return mean_results_per_model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we have to define the logic to translate a dictionary search space into instantiated models we can pass to `evaluate_models_with_cv`.\n",
    "\n",
    "```{note}\n",
    "scikit-learn and statsforecast models can be easily serialized and are very small, meaning instantiated models can be easily passed around the Ray cluster. With other frameworks, such as Torch, you may want to instead instantiate the model in the task that fits it in order to avoid issues.\n",
    "```\n",
    "\n",
    "Our `generate_configurations` generator translates a two-level dictionary, where the keys are the model classes and the values are dictionaries of arguments and lists of their possible values. We want to run a grid search, meaning we want to evaluate every possible hyperparameter combination for the given models.\n",
    "\n",
    "The search space we will be using later looks like this:\n",
    "```python\n",
    "{\n",
    "    AutoARIMA: {},\n",
    "    AutoETS: {\n",
    "        \"season_length\": [6, 7],\n",
    "        \"model\": [\"ZNA\", \"ZZZ\"]\n",
    "    }\n",
    "}\n",
    "```\n",
    "\n",
    "It will translate to the following models:\n",
    "```python\n",
    "AutoARIMA(),\n",
    "AutoETS(season_length=6, model=\"ZNA\")\n",
    "AutoETS(season_length=7, model=\"ZNA\")\n",
    "AutoETS(season_length=6, model=\"ZZZ\")\n",
    "AutoETS(season_length=7, model=\"ZZZ\")\n",
    "```\n",
    "\n",
    "`evaluate_search_space_with_cv` is the entry point for our AutoML system, which takes in the search space, dataframe, label column, metrics, the metric to use to choose the best configuration, whether we want to minimize or maximize it, the frequency of the data and the scikit-learn `TimeSeriesSplit` cross-validation splitter to use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_configurations(search_space: Dict[Type[_TS], Dict[str, list]]) -> _TS:\n",
    "    # Convert dict search space into configurations - models instantiated with specific arguments.\n",
    "    for model, model_search_space in search_space.items():\n",
    "        kwargs, values = model_search_space.keys(), model_search_space.values()\n",
    "        # Get a product - all combinations in the per-model grid.\n",
    "        for configuration in itertools.product(*values):\n",
    "            yield model(**dict(zip(kwargs, configuration)))\n",
    "\n",
    "\n",
    "def evaluate_search_space_with_cv(\n",
    "    search_space: Dict[Type[_TS], Dict[str, list]],\n",
    "    df: pd.DataFrame,\n",
    "    label_column: str,\n",
    "    metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],\n",
    "    eval_metric: str,\n",
    "    mode: str = \"min\",\n",
    "    freq: str = \"D\",\n",
    "    cv: Union[int, TimeSeriesSplit] = 5,\n",
    ") -> List[Tuple[_TS, Dict[str, float]]]:\n",
    "    assert eval_metric in metrics\n",
    "    assert mode in (\"min\", \"max\")\n",
    "\n",
    "    configurations = list(generate_configurations(search_space))\n",
    "    print(\n",
    "        f\"Evaluating {len(configurations)} configurations with {cv.get_n_splits()} splits each, \"\n",
    "        f\"totalling {len(configurations)*cv.get_n_splits()} tasks...\"\n",
    "    )\n",
    "    ret = evaluate_models_with_cv(\n",
    "        configurations, df, label_column, metrics, freq=freq, cv=cv\n",
    "    )\n",
    "\n",
    "    # Sort the results by eval_metric\n",
    "    ret = sorted(ret.items(), key=lambda x: x[1][eval_metric], reverse=(mode == \"max\"))\n",
    "    print(\"Evaluation complete!\")\n",
    "    return ret"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With our system complete, we just need a quick helper function to obtain the data from an S3 bucket and preprocess it to the format statsforecast expects. As the dataset is quite large, we use PyArrow's push-down predicate as a filter to obtain just the rows we care about without having to load them all into memory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_m5_partition(unique_id: str) -> pd.DataFrame:\n",
    "    ds1 = pq.read_table(\n",
    "        \"s3://anonymous@m5-benchmarks/data/train/target.parquet\",\n",
    "        filters=[(\"item_id\", \"=\", unique_id)],\n",
    "    )\n",
    "    Y_df = ds1.to_pandas()\n",
    "    # StatsForecasts expects specific column names!\n",
    "    Y_df = Y_df.rename(\n",
    "        columns={\"item_id\": \"unique_id\", \"timestamp\": \"ds\", \"demand\": \"y\"}\n",
    "    )\n",
    "    Y_df[\"unique_id\"] = Y_df[\"unique_id\"].astype(str)\n",
    "    Y_df[\"ds\"] = pd.to_datetime(Y_df[\"ds\"])\n",
    "    Y_df = Y_df.dropna()\n",
    "    constant = 10\n",
    "    Y_df[\"y\"] += constant\n",
    "    return Y_df[Y_df.unique_id == unique_id]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>unique_id</th>\n",
       "      <th>ds</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2011-01-29</td>\n",
       "      <td>13.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2011-01-30</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2011-01-31</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2011-02-01</td>\n",
       "      <td>11.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2011-02-02</td>\n",
       "      <td>14.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1936</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2016-05-18</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1937</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2016-05-19</td>\n",
       "      <td>11.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1938</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2016-05-20</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1939</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2016-05-21</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1940</th>\n",
       "      <td>FOODS_1_001_CA_1</td>\n",
       "      <td>2016-05-22</td>\n",
       "      <td>10.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>1941 rows × 3 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "             unique_id         ds     y\n",
       "0     FOODS_1_001_CA_1 2011-01-29  13.0\n",
       "1     FOODS_1_001_CA_1 2011-01-30  10.0\n",
       "2     FOODS_1_001_CA_1 2011-01-31  10.0\n",
       "3     FOODS_1_001_CA_1 2011-02-01  11.0\n",
       "4     FOODS_1_001_CA_1 2011-02-02  14.0\n",
       "...                ...        ...   ...\n",
       "1936  FOODS_1_001_CA_1 2016-05-18  10.0\n",
       "1937  FOODS_1_001_CA_1 2016-05-19  11.0\n",
       "1938  FOODS_1_001_CA_1 2016-05-20  10.0\n",
       "1939  FOODS_1_001_CA_1 2016-05-21  10.0\n",
       "1940  FOODS_1_001_CA_1 2016-05-22  10.0\n",
       "\n",
       "[1941 rows x 3 columns]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df = get_m5_partition(\"FOODS_1_001_CA_1\")\n",
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can now run our AutoML system with our search space and obtain the best model with its configuration. We will be using scikit-learn implementations of mean squared error (MSE) and mean absolute error (MAE) as metrics, with the former being what we want to optimize for."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tuning_results = evaluate_search_space_with_cv(\n",
    "    {AutoARIMA: {}, AutoETS: {\"season_length\": [6, 7], \"model\": [\"ZNA\", \"ZZZ\"]}},\n",
    "    df,\n",
    "    \"y\",\n",
    "    {\"mse\": mean_squared_error, \"mae\": mean_absolute_error},\n",
    "    \"mse\",\n",
    "    cv=TimeSeriesSplit(test_size=1),\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see that the model that minimizes MSE the most from our search space is a ZNA AutoETS model with a season length of 6."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(tuning_results[0])\n",
    "\n",
    "# Print arguments of the model:\n",
    "print(tuning_results[0][0].__dict__)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.8.10 ('venv': venv)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  },
  "orig_nbformat": 4,
  "vscode": {
   "interpreter": {
    "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
